package com.parkingwang.learning.threadrx;

import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

/**
 * observable 发射数据
 * operators  加工数据在后台线程进行
 * observer  在前台线程中响应数据
 */
public class SchedulerRx {


    /**
     * 对数据进行处理subscribeOn() ,多次切换，只对最开始切换的线程有效
     * <p>
     * 下游数据运行在特定线程observerOn()，多次切换，每次都会有效，线程一直在切换
     * note:observeOn 一旦调用，subscribeOn()切换失效
     * <p>
     * 调度器
     *
     * @param args
     */
    public static void main(String[] args) {


        Observable.just(1, 2, 3)
                .observeOn(Schedulers.computation())
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        System.out.println(Thread.currentThread().getName() + "-----" + integer);
                        return "字符map1：" + integer;
                    }
                })
                .subscribeOn(Schedulers.single())
                .subscribeOn(Schedulers.newThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        System.out.println(Thread.currentThread().getName() + "-----" + s);
                        return "字符map2：" + s;
                    }
                })
//                .observeOn(Schedulers.computation())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        System.out.println(Thread.currentThread().getName() + "-----" + s);
                        return "字符map3：" + s;
                    }
                })
                .observeOn(Schedulers.io())
                .subscribeOn(Schedulers.single())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(Thread.currentThread().getName() + "-----" + s);
                    }
                });


        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }


}
